/**
 * FileName: JoinAnotherDataFilterImpl
 * Author:   SAMSUNG-PC 孙中军
 * Date:     2019/3/19 19:49
 * Description: 关联外部数据
 */
package cn.com.bonc.process.impl;

import cn.com.bonc.conf.ConfigurationManager;
import cn.com.bonc.constant.Constants;
import cn.com.bonc.process.Process;
import cn.com.bonc.util.ProcessCfgUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class JoinExternalDataProcessImpl implements Process {

    private SparkSession sparkSession;

    public JoinExternalDataProcessImpl(SparkSession sparkSession) {
        this.sparkSession = sparkSession;
    }

    @Override
    public Dataset<Row> processing(Dataset<Row> rowDataset) {
        if (ProcessCfgUtil.isJoin()){
            String name = ProcessCfgUtil.getJoinCapitalName();
            if (name.equals("A_REDIS")){
                sparkSession.read()
                        .format("org.apache.spark.sql.redis")
                        .option("table", "Xdata")
                        .load()
                        .registerTempTable("A_REDIS");
            }
            if (name.equals("A_HDFS")){
                sparkSession.readStream()
                        .textFile(ConfigurationManager.getProperty(Constants.HDFS_STATIC_PATH))
                        .registerTempTable("A_HDFS");
            }
        }
        return rowDataset;

    }
}
